如何将2D数组作为多处理.阵列转到多处理.

您所在的位置:网站首页 python sharedctypes 如何将2D数组作为多处理.阵列转到多处理.

如何将2D数组作为多处理.阵列转到多处理.

2023-04-23 08:09| 来源: 网络整理| 查看: 265

百度翻译此文   有道翻译此文 问题描述

My aim is to pass a parent array to mp.Pool and fill it with 2s while distributing it to different processes. This works for arrays of 1 dimension:

import numpy as np import multiprocessing as mp import itertools def worker_function(i=None): global arr val = 2 arr[i] = val print(arr[:]) def init_arr(arr=None): globals()['arr'] = arr def main(): arr = mp.Array('i', np.zeros(5, dtype=int), lock=False) mp.Pool(1, initializer=init_arr, initargs=(arr,)).starmap(worker_function, zip(range(5))) print(arr[:]) if __name__ == '__main__': main()

Output:

[2, 0, 0, 0, 0] [2, 2, 0, 0, 0] [2, 2, 2, 0, 0] [2, 2, 2, 2, 0] [2, 2, 2, 2, 2] [2, 2, 2, 2, 2]

But how can I do the same for x-dimensional arrays? Adding a dimension to arr:

arr = mp.Array('i', np.zeros((5, 5), dtype=int), lock=False)

produces an error:

Traceback (most recent call last): File "C:/Users/Artur/Desktop/RL_framework/test2.py", line 23, in main() File "C:/Users/Artur/Desktop/RL_framework/test2.py", line 17, in main arr = mp.Array('i', np.zeros((5, 5), dtype=int), lock=False) File "C:\Users\Artur\anaconda3\envs\RL_framework\lib\multiprocessing\context.py", line 141, in Array ctx=self.get_context()) File "C:\Users\Artur\anaconda3\envs\RL_framework\lib\multiprocessing\sharedctypes.py", line 88, in Array obj = RawArray(typecode_or_type, size_or_initializer) File "C:\Users\Artur\anaconda3\envs\RL_framework\lib\multiprocessing\sharedctypes.py", line 67, in RawArray result.__init__(*size_or_initializer) TypeError: only size-1 arrays can be converted to Python scalars

Changing the dtype of arr does not help either.

推荐答案

You can't directly use multiprocessing.Array as a 2-d array, but in one-dimensional memory, the second dimension is just an illusion anyway :).

Luckily numpy allows reading an array from buffer and reshaping it without the need to copy it. In the demo below I just use a separate lock so we can observe the changes made step by step, there's currently no race condition for what it's doing.

import multiprocessing as mp import numpy as np def worker_function(i): global arr, arr_lock val = 2 with arr_lock: arr[i, :i+1] = val print(f"{mp.current_process().name}\n{arr[:]}") def init_arr(arr, arr_lock=None): globals()['arr'] = np.frombuffer(arr, dtype='int32').reshape(5, 5) globals()['arr_lock'] = arr_lock def main(): arr = mp.Array('i', np.zeros(5 * 5, dtype='int32'), lock=False) arr_lock = mp.Lock() mp.Pool(2, initializer=init_arr, initargs=(arr, arr_lock)).map( worker_function, range(5) ) arr = np.frombuffer(arr, dtype='int32').reshape(5, 5) print(f"{mp.current_process().name}\n{arr}") if __name__ == '__main__': main()

Output:

ForkPoolWorker-1 [[2 0 0 0 0] [0 0 0 0 0] [0 0 0 0 0] [0 0 0 0 0] [0 0 0 0 0]] ForkPoolWorker-2 [[2 0 0 0 0] [2 2 0 0 0] [0 0 0 0 0] [0 0 0 0 0] [0 0 0 0 0]] ForkPoolWorker-1 [[2 0 0 0 0] [2 2 0 0 0] [2 2 2 0 0] [0 0 0 0 0] [0 0 0 0 0]] ForkPoolWorker-2 [[2 0 0 0 0] [2 2 0 0 0] [2 2 2 0 0] [2 2 2 2 0] [0 0 0 0 0]] ForkPoolWorker-1 [[2 0 0 0 0] [2 2 0 0 0] [2 2 2 0 0] [2 2 2 2 0] [2 2 2 2 2]] MainProcess [[2 0 0 0 0] [2 2 0 0 0] [2 2 2 0 0] [2 2 2 2 0] [2 2 2 2 2]] Process finished with exit code 0


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3